package com.gitee.jastee.kafka.tool;

import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.gitee.jastee.kafka.consumer.KafkaConsumerClient;
import com.gitee.jastee.util.ParameterTool;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import static com.gitee.jastee.kafka.constant.KafkaConstants.*;

/**
 * @author Jast
 * @description
 * @date 2022-03-03 16:20
 */
public class KafkaConsumerTool {
    private static final Log log = LogFactory.get();

    public static void consumer(String[] args) throws InterruptedException {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String topicName = parameterTool.getRequired(KAFKA_TOPIC);
        String groupId = parameterTool.get(KAFKA_GROUP_ID);
        long sleep = parameterTool.getLong(KAFKA_SLEEP_TIME, 0);

        KafkaConsumerClient kafkaConsumerClient = new KafkaConsumerClient();
        KafkaConsumer<String, String> consumer = kafkaConsumerClient.createConsumer(parameterTool);
        while (true) {
            ConsumerRecords<String, String> poll = consumer.poll(10000);
            for (ConsumerRecord<String, String> record : poll) {
                log.info(
                        "partition:{},offset:{},data:{}",
                        record.partition(),
                        record.offset(),
                        record.value());
                Thread.sleep(sleep);
            }
        }
    }
}
